#!/bin/dash

#Copyright (c) 2014 Luigi Tarenga <luigi.tarenga@gmail.com>
#Distributed under the terms of a MIT license.

#append-entry:
#input  term leader_id prev_log_index prev_log_term entry_term entry_command entry leader_commit
#output term success

read current_term     < state/current_term
read last_log_index   < state/last_log_index
read commit_index     < temp/commit_index
read hostname         < temp/hostname
read cluster_nodes    < conf/cluster_nodes

cluster_nodes_new=""
cluster_nodes_old_new="$cluster_nodes"
if [ -f conf/cluster_nodes_new ] ; then
   read cluster_nodes_new < conf/cluster_nodes_new
   cluster_nodes_old_new=$(for h in $cluster_nodes $cluster_nodes_new ; do echo $h ; done | sort -u)
fi

echo false          > temp/quorum_heartbeat

for h in $cluster_nodes_old_new; do
   [ "$h" = "$hostname" ] && continue
   {
      read match_index < temp/${h}_match_index
      read next_index  < temp/${h}_next_index
      read follower_heartbeat < temp/${h}_follower_heartbeat

         
      if [ "$last_log_index" -ge "$next_index" -a -d state-machine-log/"$next_index" ] ; then
         #send entry
         prev_log_index=$((next_index-1))
         if [ -d state-machine-log/$prev_log_index ] ; then
            prev_log_term=$(echo state-machine-log/$prev_log_index/*)
            prev_log_term=${prev_log_term##state-machine-log/*/}
         else
            read prev_log_term < state-machine-snapshot/cur/last_included_term
         fi
         entry_term=$(echo state-machine-log/$next_index/*)
         entry_term=${entry_term##state-machine-log/*/}
         read entry_command entry_value < $(echo state-machine-log/$next_index/$entry_term)
         
         ssh $sshopt $h "cd $PWD ; lock_timeout=4 exec internals/activity-lock-ex internals/append-entry " \
            "$current_term $hostname $prev_log_index $prev_log_term '$entry_term' '$entry_command' " \
            "'$entry_value' $commit_index" > temp/${h}_append_result < /dev/null

         read term success < temp/${h}_append_result

      elif [ "$last_log_index" -lt "$next_index" -o "$follower_heartbeat" -eq 0 ] ; then
         #send heartbeat
         prev_log_index=$((next_index-1))
         if [ -d state-machine-log/$prev_log_index ] ; then
            prev_log_term=$(echo state-machine-log/$prev_log_index/*)
            prev_log_term=${prev_log_term##state-machine-log/*/}
         else
            read prev_log_index < state-machine-snapshot/cur/last_included_index
            read prev_log_term  < state-machine-snapshot/cur/last_included_term
            next_index=$((prev_log_index+1))
         fi

         ssh $sshopt $h "cd $PWD ; lock_timeout=4 exec internals/activity-lock-ex internals/append-entry " \
            "$current_term $hostname $prev_log_index $prev_log_term '' '' '' " \
            "$commit_index" > temp/${h}_append_result < /dev/null

         read term success < temp/${h}_append_result

      elif [ "$last_log_index" -ge "$next_index" -a ! -d state-machine-log/"$next_index" ] ; then
         #send snapshot. the log does not exist anymore and the follower is alive
         read prev_log_index < state-machine-snapshot/cur/last_included_index
         read prev_log_term  < state-machine-snapshot/cur/last_included_term

         pax -wx cpio state-machine-snapshot/cur | \
         ssh $sshopt $h "cd $PWD ; lock_timeout=4 exec internals/activity-lock-ex internals/install-snapshot " \
            "$current_term $hostname" > temp/${h}_append_result

         read term < temp/${h}_append_result
         success="true"
      fi

      if [ "$((term+0))" -eq "$current_term" -a "$success" = "true" ] ; then
         if [ "$last_log_index" -ge "$next_index" ] ; then
            #sent entry, update match_index
            echo $next_index        > temp/${h}_match_index
            echo $((next_index+1)) > temp/${h}_next_index
         else 
            #sent heartbeat
            :
         fi
         echo 1                  > temp/${h}_follower_heartbeat
      elif [ "$((term+0))" -eq "$current_term" -a "$success" = "false" ] ; then
         #consistency check failed: decrement next_index for host h
         echo $((next_index-1)) > temp/${h}_next_index
         echo 1                  > temp/${h}_follower_heartbeat
      elif [ "$((term+0))" -gt "$current_term" ] ; then
         #found a higher term: step down
         cp conf/election_timeout temp/election_timeout
         echo ""            > state/voted_for
         echo follower      > temp/server_role
         echo "$((term+0))" > state/current_term
         echo false         > temp/quorum_heartbeat
         echo $(date) switching to follower.
      else
         echo 0 > temp/${h}_follower_heartbeat
      fi
   } &
done

wait

read server_role < temp/server_role
[ "$server_role" != "leader" ] && exit 0

quorum_heartbeat=true
N=$last_log_index

for configuration in "$cluster_nodes" "$cluster_nodes_new" ; do
   if [ -n "$configuration" ] ; then
      i=0; q=1; indexes=""
      for h in $configuration ; do
         if [ "$h" = "$hostname" ] ; then
            match_index=$last_log_index
         else
            read match_index < temp/${h}_match_index
            read follower_heartbeat < temp/${h}_follower_heartbeat
            q=$((q+follower_heartbeat))
         fi 
         indexes="$match_index\n$indexes"
         i=$((i+1))
      done

      majority=$((i/2+1))
      majority_match=$commit_index
      for match_index in $(printf "$indexes" | sort -n | head -n $(((i+1)/2))) ; do
         if [ "$match_index" -gt "$commit_index" -a -f state-machine-log/$match_index/$current_term ] ; then
            majority_match=$match_index
         fi
      done
      
      [ "$majority_match" -lt "$N" -a "$majority_match" -ge "$commit_index" ] && N=$majority_match
      [ "$q" -lt "$majority" ] && quorum_heartbeat=false
   fi 
done

if [ "$N" -gt "$commit_index" -a -f state-machine-log/$N/$current_term ] ; then
   commit_index=$N
   echo $commit_index > temp/commit_index
fi
echo $quorum_heartbeat > temp/quorum_heartbeat

. internals/apply-log
